{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "# kafkaReceiveDataPy\n",
    "This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra.\n",
    "\n",
    "```\n",
    "CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};\n",
    "\n",
    "CREATE TABLE test_time.sent_received(\n",
    " time_sent TEXT,\n",
    " time_received TEXT,\n",
    "PRIMARY KEY (time_sent)\n",
    ");\n",
    "```\n",
    "\n",
    "A message that gives the current time is received every second. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Add dependencies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'\n",
    "import time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Load modules and start SparkContext\n",
    "Note that SparkContext must be started to effectively load the package dependencies. Two cores are used, since one is needed for running the Kafka receiver."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SQLContext, Row\n",
    "conf = SparkConf() \\\n",
    "    .setAppName(\"Streaming test in Docker Swarm\") \\\n",
    "    .setMaster(\"spark://spark-master:7077\") \\\n",
    "    .set(\"spark.cassandra.connection.host\", \"cassandra-seed\")\n",
    "sc = SparkContext(conf=conf) \n",
    "sqlContext=SQLContext(sc)\n",
    "from pyspark.streaming import StreamingContext\n",
    "from pyspark.streaming.kafka import KafkaUtils"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## SaveToCassandra function\n",
    "Takes a list of tuple (rows) and save to Cassandra "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "def saveToCassandra(rows):\n",
    "    if not rows.isEmpty(): \n",
    "        sqlContext.createDataFrame(rows).write\\\n",
    "        .format(\"org.apache.spark.sql.cassandra\")\\\n",
    "        .mode('append')\\\n",
    "        .options(table=\"sent_received\", keyspace=\"test_time\")\\\n",
    "        .save()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Create streaming task\n",
    "* Receive data from Kafka 'test' topic every five seconds\n",
    "* Get stream content, and add receiving time to each message\n",
    "* Save each RDD in the DStream to Cassandra. Also print on screen"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "ssc = StreamingContext(sc, 5)\n",
    "kvs = KafkaUtils.createStream(ssc, \"kafka:2181\", \"spark-streaming-consumer\", {'test': 1})\n",
    "data = kvs.map(lambda x: x[1])\n",
    "rows= data.map(lambda x:Row(time_sent=x,time_received=time.strftime(\"%Y-%m-%d %H:%M:%S\")))\n",
    "rows.foreachRDD(saveToCassandra)\n",
    "rows.pprint()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Start streaming"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "ssc.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Stop streaming"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "ssc.stop(stopSparkContext=False,stopGraceFully=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Get Cassandra table content"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "data=sqlContext.read\\\n",
    "    .format(\"org.apache.spark.sql.cassandra\")\\\n",
    "    .options(table=\"sent_received\", keyspace=\"test_time\")\\\n",
    "    .load()\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "source": [
    "## Get Cassandra table content using SQL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": [
    "data.registerTempTable(\"sent_received\");\n",
    "data.printSchema()\n",
    "data=sqlContext.sql(\"select * from sent_received\")\n",
    "data.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true,
    "deletable": true,
    "editable": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
